bitkeeper revision 1.1713.3.8 (42b2f91bG45uUFWHhUaUha3e1OAxJQ)
authorcl349@firebug.cl.cam.ac.uk <cl349@firebug.cl.cam.ac.uk>
Fri, 17 Jun 2005 16:23:55 +0000 (16:23 +0000)
committercl349@firebug.cl.cam.ac.uk <cl349@firebug.cl.cam.ac.uk>
Fri, 17 Jun 2005 16:23:55 +0000 (16:23 +0000)
xsnode.py:
  Updated watches/event code from Mike Wray.
Signed-off-by: Mike Wray <mike.wray@hp.com>
Signed-off-by: Christian Limpach <Christian.Limpach@cl.cam.ac.uk>
tools/python/xen/xend/xenstore/xsnode.py

index ae770219aba4022d17b4123aecfb8d432aa94c02..f9721caf1b2674b5cc7eba77a726015ec1ab121c 100644 (file)
@@ -2,7 +2,9 @@ import errno
 import os
 import os.path
 import select
+import socket
 import sys
+import threading
 import time
 
 from xen.lowlevel import xs
@@ -12,18 +14,26 @@ from xen.xend.PrettyPrint import prettyprint
 SELECT_TIMEOUT = 2.0
 
 def getEventPath(event):
-    return os.path.join("/_event", event)
+    if event and event.startswith("/"):
+        event = event[1:]
+    return os.path.join("/event", event)
 
 def getEventIdPath(event):
-    return os.path.join(eventPath(event), "@eid")
+    return os.path.join(getEventPath(event), "@eid")
 
 class Subscription:
 
-    def __init__(self, event, fn, id):
-        self.event = event
+    def __init__(self, path, fn, sid):
+        self.path = path
         self.watcher = None
         self.fn = fn
-        self.id = id
+        self.sid = sid
+
+    def getPath(self):
+        return self.path
+
+    def getSid(self):
+        return self.sid
 
     def watch(self, watcher):
         self.watcher = watcher
@@ -34,10 +44,11 @@ class Subscription:
         if watcher:
             self.watcher = None
             watcher.delSubs(self)
+        return watcher
 
-    def notify(self, event):
+    def notify(self, path, val):
         try:
-            self.fn(event, id)
+            self.fn(self, path, val)
         except SystemExitException:
             raise
         except:
@@ -45,45 +56,45 @@ class Subscription:
 
 class Watcher:
 
-    def __init__(self, store, event):
-        self.path = getEventPath(event)
-        self.eidPath = getEventIdPath(event)
+    def __init__(self, store, path):
+        self.path = path
         store.mkdirs(self.path)
-        if not store.exists(self.eidPath):
-            store.writeInt(self.eidPath, 0)
         self.xs = None
-        self.subs = []
+        self.subscriptions = []
 
-    def __getattr__(self, k, v):
-        if k == "fileno":
-            if self.xs:
-                return self.xs.fileno
-            else:
-                return -1
+    def fileno(self):
+        if self.xs:
+            return self.xs.fileno
         else:
-            return self.__dict__.get(k, v)
+            return -1
+
+    def getPath(self):
+        return self.path
 
     def addSubs(self, subs):
-        self.subs.append(subs)
+        self.subscriptions.append(subs)
         self.watch()
 
     def delSubs(self, subs):
-        self.subs.remove(subs)
-        if len(self.subs) == 0:
+        self.subscriptions.remove(subs)
+        if len(self.subscriptions) == 0:
             self.unwatch()
 
-    def getEvent(self):
-        return self.event
-
     def watch(self):
         if self.xs: return
         self.xs = xs.open()
-        self.xs.watch(path)
+        self.xs.watch(self.path)
 
     def unwatch(self):
         if self.xs:
-            self.xs.unwatch(self.path)
-            self.xs.close()
+            try:
+                self.xs.unwatch(self.path)
+            except Exception, ex:
+                print 'Watcher>unwatch>', ex
+            try:
+                self.xs.close()
+            except Exception, ex:
+                pass
             self.xs = None
             
     def watching(self):
@@ -92,22 +103,38 @@ class Watcher:
     def getNotification(self):
         p = self.xs.read_watch()
         self.xs.acknowledge_watch()
-        eid = self.xs.readInt(self.eidPath)
         return p
 
-    def notify(self, subs):
-        p = self.getNotification()
-        for s in subs:
-            s.notify(p)
-            
+    def notify(self):
+        try:
+            p = self.getNotification()
+            v = self.xs.read(p)
+            for s in subscriptions:
+                s.notify(p, v)
+        except Exception, ex:
+            print 'Notify exception:', ex
+
+class EventWatcher(Watcher):
+
+    def __init__(self, store, path, event):
+        Watcher.__init__(self, store, path)
+        self.event = event
+        self.eidPath = getEventIdPath(event)
+        if not store.exists(self.eidPath):
+            store.write(self.eidPath, str(0))
+
+    def getEvent(self):
+        return self.event
+
 class XenStore:
 
+    xs = None
+    watchThread = None
+    subscription_id = 1
+    
     def __init__(self):
-        self.xs = None
-        #self.xs = xs.open()
-        self.subscription = {}
-        self.subscription_id = 0
-        self.events = {}
+        self.subscriptions = {}
+        self.watchers = {}
         self.write("/", "")
 
     def getxs(self):
@@ -119,8 +146,8 @@ class XenStore:
                     ex = None
                     break
                 except Exception, ex:
-                    print >>stderr, "Exception connecting to xsdaemon:", ex
-                    print >>stderr, "Trying again..."
+                    print >>sys.stderr, "Exception connecting to xsdaemon:", ex
+                    print >>sys.stderr, "Trying again..."
                     time.sleep(1)
             else:
                 raise ex
@@ -217,70 +244,85 @@ class XenStore:
         self.getxs().write(path, data, create=create, excl=excl)
 
     def begin(self, path):
-        self.getxs().begin_transaction(path)
+        self.getxs().transaction_start(path)
 
     def commit(self, abandon=False):
-        self.getxs().end_transaction(abort=abandon)
+        self.getxs().transaction_end(abort=abandon)
+
+    def watch(self, path, fn):
+        watcher = self.watchers.get(path)
+        if not watcher:
+            watcher = self.addWatcher(Watcher(self, path))
+        return self.addSubscription(watcher, fn)
+        
+    def unwatch(self, sid):
+        s = self.subscriptions.get(sid)
+        if not s: return
+        del self.subscriptions[s.sid]
+        watcher = s.unwatch()
+        if watcher and not watcher.watching():
+            del self.watchers[path]
 
     def subscribe(self, event, fn):
-        watcher = self.watchEvent(event)
-        self.subscription_id += 1
-        subs = Subscription(event, fn, self.subscription_id)
-        self.subscription[subs.id] = subs
-        subs.watch(watcher)
-        return subs.id
+        path = getEventPath(event)
+        watcher = self.watchers.get(path)
+        if not watcher:
+            watcher = self.addWatcher(EventWatcher(self, path, event))
+        return self.addSubscription(watcher, fn)
 
-    def unsubscribe(self, sid):
-        s = self.subscription.get(sid)
-        if not s: return
-        del self.subscription[s.id]
-        s.unwatch()
-        unwatchEvent(s.event)
+    unsubscribe = unwatch
 
     def sendEvent(self, event, data):
         eventPath = getEventPath(event)
         eidPath = getEventIdPath(event)
         try:
-            self.begin(eventPath)
+            #self.begin(eventPath)
             self.mkdirs(eventPath)
+            eid = 1
             if self.exists(eidPath):
-                eid = self.readInt(eidPath)
-                eid += 1
-            else:
-                eid = 1
-            self.writeInt(eidPath, eid)
+                data = self.read(eidPath)
+                print 'sendEvent>', 'data=', data, type(data)
+                try:
+                    eid = int(self.read(eidPath))
+                    eid += 1
+                except Exception, ex:
+                    print 'sendEvent>', ex
+                    pass
+            self.write(eidPath, str(eid))
             self.write(os.path.join(eventPath, str(eid)), data)
         finally:
-            self.commit()
+            #self.commit()
+            pass
 
-    def watchEvent(self, event):
-        if event in  self.events:
-            return
-        watcher = Watcher(event)
-        self.watchers[watcher.getEvent()] = watcher
+    def addWatcher(self, watcher):
+        self.watchers[watcher.getPath()] = watcher
         self.watchStart()
         return watcher
 
-    def unwatchEvent(self, event):
-        watcher = self.watchers.get(event)
-        if not watcher:
-            return
-        if not watcher.watching():
-            del self.watchers[event]
+    def addSubscription(self, watcher, fn):
+        self.subscription_id += 1
+        subs = Subscription(watcher.getPath(), fn, self.subscription_id)
+        self.subscriptions[subs.sid] = subs
+        subs.watch(watcher)
+        return subs.sid
 
     def watchStart(self):
         if self.watchThread: return
-
+        self.watchThread = threading.Thread(name="Watcher",
+                                            target=self.watchMain)
+        self.watchThread.setDaemon(True)
+        self.watchThread.start()
+        
     def watchMain(self):
         try:
             while True:
                 if self.watchThread is None: return
-                if not self.events:
+                if not self.watchers:
                     return
                 rd = self.watchers.values()
                 try:
-                    (rd, wr, er) = select.select(rd, [], [], SELECT_TIMEOUT)
-                    for watcher in rd:
+                    (srd, swr, ser) = select.select(rd, [], [], SELECT_TIMEOUT)
+                    for watcher in srd:
                         watcher.notify()
                 except socket.error, ex:
                     if ex.args[0] in (EAGAIN, EINTR):
@@ -315,6 +357,9 @@ class XenNode:
             else:
                 raise ValueError("path does not exist: '%s'" % path)
 
+    def getStore(self):
+        return self.store
+
     def relPath(self, path=""):
         if not path:
             return self.path
@@ -376,6 +421,24 @@ class XenNode:
     def releaseDomain(self, dom):
         self.store.releaseDomain(dom)
 
+    def watch(self, fn, path=""):
+        """Watch a path for changes. The path is relative
+        to the node and defaults to the node itself.
+        """
+        return self.store.watch(self.relPath(path), fn)
+
+    def unwatch(self, sid):
+        return self.store.unwatch(sid)
+
+    def subscribe(self, event, fn):
+        return self.store.subscribe(event, fn)
+
+    def unsubscribe(self, sid):
+        self.store.unsubscribe(sid)
+
+    def sendEvent(self, event, data):
+        return self.store.sendEvent(event, data)
+
     def __repr__(self):
         return "<XenNode %s>" % self.path